package com.powernode.goods.service;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.List;

@Service
@Slf4j
public class GoodsServiceImpl  {
    @PostConstruct
    public String decr() throws InterruptedException, MQClientException {
        //由于是模拟的代码，所以此处简单进行实现
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("consumerGroup11");
        defaultMQPushConsumer.setNamesrvAddr("192.168.100.110:9876");
        defaultMQPushConsumer.subscribe("orderTopic1","*");
        defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach(messageExt -> {
                    byte[] body = messageExt.getBody();
                    String s = new String(body);
                    String[] split = s.split(":");
                    try {
                        Thread.sleep(800);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("商品id为{}被扣减库存{}",split[0],split[1]);
                });

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        defaultMQPushConsumer.start();
        return "success";
    }

}
